package connector

import (
	"context"
	"time"

	"github.com/seal-io/walrus/pkg/apis/runtime"
	pkgconn "github.com/seal-io/walrus/pkg/connectors"
	"github.com/seal-io/walrus/pkg/costs/deployer"
	"github.com/seal-io/walrus/pkg/dao/model"
	"github.com/seal-io/walrus/pkg/dao/model/connector"
	"github.com/seal-io/walrus/pkg/dao/types"
	"github.com/seal-io/walrus/pkg/dao/types/status"
	"github.com/seal-io/walrus/pkg/datalisten/modelchange"
	"github.com/seal-io/walrus/utils/gopool"
	"github.com/seal-io/walrus/utils/log"
	"github.com/seal-io/walrus/utils/topic"
)

func (h Handler) Create(req CreateRequest) (CreateResponse, error) {
	entity := req.Model()

	// TODO(thxCode): generated by entc.
	if entity.Type == types.ConnectorTypeK8s && entity.EnableFinOps {
		status.ConnectorStatusCostToolsDeployed.Unknown(entity, "Deploying cost tools")
		status.ConnectorStatusCostSynced.Unknown(entity,
			"It takes about an hour to generate hour-level cost data")
	}

	entity.Status.SetSummary(status.WalkConnector(&entity.Status))

	entity, err := h.modelClient.Connectors().Create().
		Set(entity).
		Save(req.Context)
	if err != nil {
		return nil, err
	}

	err = applyFinOps(h.modelClient, entity, false)
	if err != nil {
		return nil, err
	}

	return model.ExposeConnector(entity), nil
}

func (h Handler) Get(req GetRequest) (GetResponse, error) {
	entity, err := h.modelClient.Connectors().Get(req.Context, req.ID)
	if err != nil {
		return nil, err
	}

	return model.ExposeConnector(entity), nil
}

func (h Handler) Update(req UpdateRequest) error {
	entity := req.Model()

	entity, err := h.modelClient.Connectors().UpdateOne(entity).
		Set(entity).
		Save(req.Context)
	if err != nil {
		return err
	}

	err = applyFinOps(h.modelClient, entity, false)
	if err != nil {
		return err
	}

	return nil
}

func (h Handler) Delete(req DeleteRequest) error {
	return h.modelClient.Connectors().DeleteOneID(req.ID).
		Exec(req.Context)
}

var (
	queryFields = []string{
		connector.FieldName,
	}
	getFields = connector.WithoutFields(
		connector.FieldUpdateTime)
	sortFields = []string{
		connector.FieldName,
		connector.FieldType,
		connector.FieldCreateTime,
	}
)

func (h Handler) CollectionGet(req CollectionGetRequest) (CollectionGetResponse, int, error) {
	query := h.modelClient.Connectors().Query()

	if req.Project != nil {
		// Project scope only.
		ps := connector.ProjectID(req.Project.ID)

		if req.WithGlobal {
			// Project scope with global.
			ps = connector.Or(
				connector.ProjectID(req.Project.ID),
				connector.ProjectIDIsNil())
		}

		query.Where(ps)
	} else {
		// Global scope.
		query.Where(connector.ProjectIDIsNil())
	}

	if req.Category != "" {
		query.Where(connector.Category(req.Category))
	}

	if req.Type != "" {
		query.Where(connector.Type(req.Type))
	}

	if queries, ok := req.Querying(queryFields); ok {
		query.Where(queries)
	}

	if stream := req.Stream; stream != nil {
		// Handle stream request.
		if fields, ok := req.Extracting(getFields, getFields...); ok {
			query.Select(fields...)
		}

		if orders, ok := req.Sorting(sortFields, model.Desc(connector.FieldCreateTime)); ok {
			query.Order(orders...)
		}

		t, err := topic.Subscribe(modelchange.Connector)
		if err != nil {
			return nil, 0, err
		}

		defer func() { t.Unsubscribe() }()

		for {
			var event topic.Event

			event, err = t.Receive(stream)
			if err != nil {
				return nil, 0, err
			}

			dm, ok := event.Data.(modelchange.Event)
			if !ok {
				continue
			}

			var items []*model.ConnectorOutput

			switch dm.Type {
			case modelchange.EventTypeCreate, modelchange.EventTypeUpdate:
				entities, err := query.Clone().
					Where(connector.IDIn(dm.IDs...)).
					// Must append project ID.
					Select(connector.FieldProjectID).
					Unique(false).
					All(stream)
				if err != nil {
					return nil, 0, err
				}

				items = model.ExposeConnectors(entities)
			case modelchange.EventTypeDelete:
				items = make([]*model.ConnectorOutput, len(dm.IDs))
				for i := range dm.IDs {
					items[i] = &model.ConnectorOutput{
						ID: dm.IDs[i],
					}
				}
			}

			if len(items) == 0 {
				continue
			}

			resp := runtime.TypedResponse(dm.Type.String(), items)
			if err = stream.SendJSON(resp); err != nil {
				return nil, 0, err
			}
		}
	}

	// Handle normal request.

	// Get count.
	cnt, err := query.Clone().Count(req.Context)
	if err != nil {
		return nil, 0, err
	}

	// Get entities.
	if limit, offset, ok := req.Paging(); ok {
		query.Limit(limit).Offset(offset)
	}

	if fields, ok := req.Extracting(getFields, getFields...); ok {
		query.Select(fields...)
	}

	if orders, ok := req.Sorting(sortFields, model.Desc(connector.FieldCreateTime)); ok {
		query.Order(orders...)
	}

	entities, err := query.
		// Must append project ID.
		Select(connector.FieldProjectID).
		Unique(false).
		All(req.Context)
	if err != nil {
		return nil, 0, err
	}

	return model.ExposeConnectors(entities), cnt, nil
}

func (h Handler) CollectionDelete(req CollectionDeleteRequest) error {
	ids := req.IDs()

	return h.modelClient.WithTx(req.Context, func(tx *model.Tx) error {
		_, err := tx.Connectors().Delete().
			Where(connector.IDIn(ids...)).
			Exec(req.Context)

		return err
	})
}

// applyFinOps updates custom pricing and (re)installs cost tools if needed,
// within 3 minutes in the background.
func applyFinOps(mc model.ClientSet, conn *model.Connector, reinstall bool) error {
	// Skip non-k8s connectors.
	if conn.Category != types.ConnectorCategoryKubernetes {
		return nil
	}

	// Skip finops disabling connectors.
	if !conn.EnableFinOps {
		return nil
	}

	gopool.Go(func() {
		logger := log.WithName("api").WithName("connector")

		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
		defer cancel()

		// Deploy tools.
		err := deployer.DeployCostTools(ctx, conn, reinstall)
		if err != nil {
			// Log instead of return error, then continue to sync the final status to connector.
			logger.Errorf("error ensuring cost tools for connector %q: %v", conn.ID, err)
		}

		// Update pricing.
		err = deployer.UpdateCustomPricing(ctx, conn)
		if err != nil {
			logger.Errorf("error updating custom pricing to connector %q: %v", conn.ID, err)
		}

		// Sync status.
		syncer := pkgconn.NewStatusSyncer(mc)

		err = syncer.SyncStatus(ctx, conn, true)
		if err != nil {
			logger.Errorf("error syncing status of connector %q: %v", conn.ID, err)
		}
	})

	return nil
}
